package com.wfg.flink.connector.mongodb;


import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.redis.RedisExampleMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.bson.BsonDocument;

import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;

/**
 * @author wfg
 * 根据名字统计访问次数
 */
public class MongoAllNameRedisCounts {

    public static void main(String[] args) throws Exception {
        String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");
        System.out.println("StartTime:" + startTime);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启Checkpointing，设置Checkpoint间隔
        env.enableCheckpointing(30000);
        // 设置Checkpoint模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置最小Checkpoint间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 设置最大并发Checkpoint数目
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 使用RocksDB作为状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.setParallelism(10);

        // 配置MongoDB源
        MongoSource<String> mongoSource = MongoSource.<String>builder()
                .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
                .setDatabase("sjzz")
                .setCollection(MONGO_TEST_PV_COLLECTION)
                .setFetchSize(2048)
//                .setLimit(1000)
                .setNoCursorTimeout(true)
                .setPartitionStrategy(PartitionStrategy.SINGLE)
                .setPartitionSize(MemorySize.ofMebiBytes(64))
//                .setSamplesPerPartition(10)
                .setDeserializationSchema(new MongoDeserializationSchema<>() {
                    @Override
                    public String deserialize(BsonDocument document) {
                        return document.toJson();
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                })
                .build();
        // 创建MongoDB数据流
        DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据，提取人名作为Key
        DataStream<Tuple2<String, Integer>> nameCountStream = sourceStream
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);
                        return Tuple2.of(data.getUserName(), 1);
                    }
                })
                .keyBy(value -> value.f0)
                .reduce(new ReduceFunction<>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        RedisSink<Tuple2<String, Integer>> sink = new RedisSink<>(conf, new RedisExampleMapper());
        nameCountStream.addSink(sink);
        // 输出结果
        env.execute("Flink MongoDB Name Count ");
        System.out.println("-----------------------------------");
        System.out.println("startTime: " + startTime);
        System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));
    }
}
